import { JsonRows, JsonAccumResult } from "./json-accumulator.js";
import { nanoid } from "nanoid";
import CP from "child_process";
import Path from "path";
import _ from "lodash";
import { watchEffect, reactive, computed } from "vue";
import OS from "os";
import { CsvLine, CsvAccumResult } from "./csv-accumulator.js";
import { WriterResult } from "./json-writer.js";
export { WriterWorker } from "./max-worker.js";

/**
 * 在nodejs或者js中返回Cpu数量
 * @returns 在nodejs或者js中返回Cpu数量
 */
export function cpus(): number {
  try {
    return Math.max(1, OS.cpus().length);
  } catch (error) {
    return Math.max(1, navigator.hardwareConcurrency);
  }
}

export interface MyWorker {
  /**
   * 返回worker实例
   */
  worker: CP.ChildProcess;
  /**
   * 停止该进程
   */
  terminate: () => void;
  /**
   * 停止该进程
   */
  kill: () => void;
}

export interface CsvAccFuncReturns extends MyWorker {
  /**
   * 设置标题行
   */
  setTitles: (titles: string[]) => void;
  /**
   * 增加一行
   */
  push: (...lines: CsvLine[]) => Promise<true>;
}

/**
 * 用子进程逐行写入csv文件
 * @param path 文件路径
 * @param rows (可选)首次导入的数据
 * @returns 进程对象
 */
export function csvAcc(path: string, rows?: CsvLine[]): CsvAccFuncReturns {
  const worker = CP.fork(Path.resolve(__dirname, "./csv-accumulator.js"));
  worker.setMaxListeners(Infinity);
  worker.send({ path });
  const work: CsvAccFuncReturns = {
    worker,
    setTitles(titles: string[]): void {
      worker.send({ titles });
    },
    push(...lines: CsvLine[]): Promise<true> {
      return new Promise((resolve) => {
        const id = nanoid();
        worker.on(
          "message",
          ({ complete, id: idCheck }: CsvAccumResult) =>
            complete && idCheck === id && resolve(true)
        );
        worker.send({ lines, id });
      });
    },
    terminate: () => worker.kill(2),
    kill: () => worker.kill(2),
  };
  rows && work.push(...rows);
  return work;
}

export type JsonWriterMajorWrite = (
  ...args: [string, any] | [any, string?]
) => Promise<true>;
export interface JsonWriterReturns extends MyWorker {
  write: JsonWriterMajorWrite;
  output: JsonWriterMajorWrite;
}
/**
 * 用子进程写入json文件
 * @param path 路径
 * @param obj (可选)可以转为json的内容
 * @returns {object} {worker,kill,terminate,write}
 */
export function jsonWriter(path?: string, obj?: any): JsonWriterReturns {
  const worker = CP.fork(Path.resolve(__dirname, "./json-writer.js"));
  worker.setMaxListeners(Infinity);
  // @ts-ignore
  const work: JsonWriterReturns = {
    worker: worker,
    kill: () => worker.kill(2),
    terminate: () => worker.kill(2),
    write: (...args: [string, any] | [any, string?]) =>
      new Promise((resolve) => {
        const id = nanoid();
        worker.on(
          "message",
          ({ complete, id: id1 }: WriterResult) =>
            id1 === id && complete && resolve(true)
        );
        let postMsg = {
          path: _.find(args, (e) =>
            /string/i.test(Object.prototype.toString.call(e))
          ),
          json: _.find(
            args,
            (e) => !/string/i.test(Object.prototype.toString.call(e))
          ),
          id,
        };
        worker.send(postMsg);
        // @ts-ignore
        postMsg = null; // 释放内存
      }),
  };
  path && worker.send({ path });
  path && obj && work.write(obj);
  work.output = work.write;
  return work;
}

export type MajorFunctionXlsxWriter = (
  ...args: [string, any] | [CsvLine[], string?]
) => Promise<true>;
export interface XlsxWriterReturns extends MyWorker {
  write: MajorFunctionXlsxWriter;
  output: MajorFunctionXlsxWriter;
}
/**
 * 用子进程写入xlsx文件
 * @param {string}path 路径
 * @param {object[] | Array[String[]]}data *
 * @returns {object} {worker,kill,terminate,write}
 */
export function xlsxWriter(path?: string, data?: CsvLine[]): XlsxWriterReturns {
  const worker = CP.fork(Path.resolve(__dirname, "xlsx-writer.js"));
  worker.setMaxListeners(Infinity);
  // @ts-ignore
  const work: XlsxWriterReturns = {
    worker: worker,
    kill: () => worker.kill(2),
    terminate: () => worker.kill(2),
    write: (...args: [string, CsvLine[]] | [CsvLine[], string?]) =>
      new Promise((resolve) => {
        const id = nanoid();
        worker.on(
          "message",
          ({ complete, id: id1 }: WriterResult) =>
            id1 === id && complete && resolve(true)
        );
        let postMsg = {
          path: _.find(args, (e) =>
            /string/i.test(Object.prototype.toString.call(e))
          ),
          json: _.find(
            args,
            (e) => !/string/i.test(Object.prototype.toString.call(e))
          ),
          id,
        };
        worker.send(postMsg);
        // @ts-ignore
        postMsg = null; // 释放内存
      }),
  };
  work.output = work.write;
  path && worker.send({ path });
  path && data && work.write(path, data);
  return work;
}
export type MajorFuncJsonAcc = (
  ...args: [string, JsonRows] | [JsonRows, string?]
) => Promise<true>;
export interface JsonAccReturns extends MyWorker {
  assign: MajorFuncJsonAcc;
  push: MajorFuncJsonAcc;
  append: MajorFuncJsonAcc;
}

/**
 * 用子进程追加json文件
 * @param {string}path 要导出的json文件路径
 * @param {*[] | object}rows 首次要导入的数据
 * @returns {object}返回{worker,kill,terminate,assign}
 */
export function jsonAcc(path?: string, rows?: JsonRows): JsonAccReturns {
  const worker = CP.fork(Path.resolve(__dirname, "./csv-accumulator.js"));
  worker.setMaxListeners(Infinity);
  // @ts-ignore
  const work: JsonAccReturns = {
    worker: worker,
    kill: () => worker.kill(2),
    terminate: () => worker.kill(2),
    assign: (...args: [string, JsonRows] | [JsonRows, string?]) =>
      new Promise((resolve) => {
        const id = nanoid();
        worker.on(
          "message",
          ({ id: id1 }: JsonAccumResult) => id1 === id && resolve(true)
        );
        // 向worker推送rows
        // let postMsg = { path, rows, id }
        let postMsg = {
          path: _.find(args, (e) =>
            /string/i.test(Object.prototype.toString.call(e))
          ),
          rows: _.find(
            args,
            (e) => !/string/i.test(Object.prototype.toString.call(e))
          ),
          id,
        };
        worker.send(postMsg);
        // @ts-expect-error
        postMsg = null; // 释放内存
      }),
  };
  work.push = work.append = work.assign;
  path && worker.send({ path });
  path && rows && work.assign(rows);
  return work;
}

export interface QueTask<Result> {
  func: () => Promise<Result>;
  id?: string;
  index?: number;
  then?: (result: Result) => void;
}

export interface QueProgress {
  /** 有多少个func还没有执行 */
  waiting: number;
  /** 有多少个func已经await了 */
  finished: number;
  /** 有多少个func还在运行中 */
  processing: number;
  /** 已完成的项目的百分比 */
  progress: `${number}%`;
  /** 已完成 / 总共 (百分比) */
  detail: `${number}/${number} (${number}%)`;
  /** 总共给了多少个项目 */
  total: number;
}

export interface QueWatcher {
  (arg: QueProgress): void;
}
/**
 * 根据最大同时进行任务的数量，创建一个任务队列
 *
 * @param {object[]}tasks 任务队列
 * @param {Function}tasks.func 该任务所用函数，返回值必须是Promise
 * @param {string | null}tasks.key 该任务的返回值如果是Object, 其中有一个key是Promise的话，key的名称
 * @param {Function}tasks.then 回调该任务返回值[key]的promise
 * @param {Function}[watcher=console.log] 回调{waiting, finished, processing, total}
 * @param {number}[maxChunk=cpus()] 队列最大长度，默认为cpu数量
 * @returns {Promise<*[]>} 返回运行结果数组
 */
export function queue<Result>(
  tasks: QueTask<Result>[] = [],
  watcher: QueWatcher = console.log,
  maxChunk: number = cpus()
): Promise<Result[]> {
  return new Promise((resolve) => {
    const beforeProcessing = reactive(
      // 未开始的任务
      [...tasks].map((obj, index) => {
        obj.id = nanoid();
        obj.index = index;
        return obj;
      })
    );
    const total = tasks.length;
    const processing: { id: string; promise: Promise<any> }[] = reactive([]); // 执行中的任务
    const results: Result[] = reactive([]);
    const finished = computed(
      () => total - processing.length - beforeProcessing.length
    );
    const progress = computed<QueProgress["progress"]>(
      () => `${~~((finished.value * 1000) / total) / 10}%`
    );
    const detail = computed<QueProgress["detail"]>(
      () => `${finished.value}/${total} (${progress.value})`
    );
    watchEffect(() => {
      watcher &&
        watcher({
          waiting: beforeProcessing.length,
          finished: finished.value,
          processing: processing.length,
          progress: progress.value,
          detail: detail.value,
          total,
        });
    });
    watchEffect(
      () =>
        processing.length + beforeProcessing.length === 0 && resolve(results)
    );
    watchEffect(() => {
      while (processing.length < maxChunk && beforeProcessing.length > 0) {
        let { func, then = () => 0, index } = beforeProcessing[0]; // 提取出waiting的一个元素
        beforeProcessing.shift(); // 删除waiting的一个元素
        const id = nanoid();
        const job: { id: string; promise: any } = reactive({
          id,
          promise: func(),
        });
        processing.push(job); // 把waiting的最后一个元素充进去
        job.promise
          .then((result: any) => {
            // 如果该promise已经fulfill
            _.pullAt(
              // 删除该元素
              processing,
              _.findIndex(processing, ({ id: idCheck }) => id === idCheck) // 找到一个符合该id的job，然后删除
            );
            index !== undefined && (results[index] = result);
            return then(result); // 回调结果
          })
          .catch(console.log);
      }
    });
  });
}
